home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) ''' pyinotify @author: Sebastien Martini @license: GPLv2+ @contact: seb@dbzteam.org ''' class PyinotifyError(Exception): '''Indicates exceptions raised by a Pyinotify class.''' pass class UnsupportedPythonVersionError(PyinotifyError): ''' Raised for unsupported Python version. ''' def __init__(self, version): ''' @param version: Current Python version @type version: string ''' PyinotifyError.__init__(self, 'Python %s is unsupported, requires at least Python 2.4' % version) class UnsupportedLibcVersionError(PyinotifyError): ''' Raised for unsupported libc version. ''' def __init__(self, version): ''' @param version: Current Libc version @type version: string ''' PyinotifyError.__init__(self, 'Libc %s is unsupported, requires at least Libc 2.4' % version) import sys if sys.version < '2.4': raise UnsupportedPythonVersionError(sys.version) sys.version < '2.4' import threading import os import select import struct import fcntl import errno import termios import array import logging import atexit from collections import deque from datetime import datetime, timedelta import time import fnmatch import re import ctypes import ctypes.util as ctypes __author__ = 'seb@dbzteam.org (Sebastien Martini)' __version__ = '0.8.6' __metaclass__ = type LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) LIBC.gnu_get_libc_version.restype = ctypes.c_char_p LIBC_VERSION = LIBC.gnu_get_libc_version() if int(LIBC_VERSION.split('.')[0]) < 2 and int(LIBC_VERSION.split('.')[0]) == 2 and int(LIBC_VERSION.split('.')[1]) < 4: raise UnsupportedLibcVersionError(LIBC_VERSION) int(LIBC_VERSION.split('.')[1]) < 4 log = logging.getLogger('pyinotify') console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) log.addHandler(console_handler) log.setLevel(20) try: if False: import psyco psyco.full() except ImportError: pass class SysCtlINotify: """ Access (read, write) inotify's variables through sysctl. Examples: - Read variable: myvar = max_queued_events.value - Update variable: max_queued_events.value = 42 """ inotify_attrs = { 'max_user_instances': 1, 'max_user_watches': 2, 'max_queued_events': 3 } def __init__(self, attrname): sino = ctypes.c_int * 3 self._attrname = attrname self._attr = sino(5, 20, SysCtlINotify.inotify_attrs[attrname]) def get_val(self): ''' @return: stored value. @rtype: int ''' oldv = ctypes.c_int(0) size = ctypes.c_int(ctypes.sizeof(oldv)) LIBC.sysctl(self._attr, 3, ctypes.c_voidp(ctypes.addressof(oldv)), ctypes.addressof(size), None, 0) return oldv.value def set_val(self, nval): ''' @param nval: set to nval. @type nval: int ''' oldv = ctypes.c_int(0) sizeo = ctypes.c_int(ctypes.sizeof(oldv)) newv = ctypes.c_int(nval) sizen = ctypes.c_int(ctypes.sizeof(newv)) LIBC.sysctl(self._attr, 3, ctypes.c_voidp(ctypes.addressof(oldv)), ctypes.addressof(sizeo), ctypes.c_voidp(ctypes.addressof(newv)), ctypes.addressof(sizen)) value = property(get_val, set_val) def __repr__(self): return '<%s=%d>' % (self._attrname, self.get_val()) for attrname in ('max_queued_events', 'max_user_instances', 'max_user_watches'): globals()[attrname] = SysCtlINotify(attrname) def iglob(pathname): if not has_magic(pathname): if hasattr(os.path, 'lexists'): if os.path.lexists(pathname): yield pathname elif os.path.islink(pathname) or os.path.exists(pathname): yield pathname return None (dirname, basename) = os.path.split(pathname) if not dirname: return None if has_magic(basename): glob_in_dir = glob1 else: glob_in_dir = glob0 for dirname in dirs: for name in glob_in_dir(dirname, basename): yield os.path.join(dirname, name) def glob1(dirname, pattern): if not dirname: dirname = os.curdir try: names = os.listdir(dirname) except os.error: return [] return fnmatch.filter(names, pattern) def glob0(dirname, basename): if basename == '' and os.path.isdir(dirname): return [ basename] if hasattr(os.path, 'lexists'): if os.path.lexists(os.path.join(dirname, basename)): return [ basename] elif os.path.islink(os.path.join(dirname, basename)) or os.path.exists(os.path.join(dirname, basename)): return [ basename] os.path.isdir(dirname) return [] magic_check = re.compile('[*?[]') def has_magic(s): return magic_check.search(s) is not None class EventsCodes: """ Set of codes corresponding to each kind of events. Some of these flags are used to communicate with inotify, whereas the others are sent to userspace by inotify notifying some events. @cvar IN_ACCESS: File was accessed. @type IN_ACCESS: int @cvar IN_MODIFY: File was modified. @type IN_MODIFY: int @cvar IN_ATTRIB: Metadata changed. @type IN_ATTRIB: int @cvar IN_CLOSE_WRITE: Writtable file was closed. @type IN_CLOSE_WRITE: int @cvar IN_CLOSE_NOWRITE: Unwrittable file closed. @type IN_CLOSE_NOWRITE: int @cvar IN_OPEN: File was opened. @type IN_OPEN: int @cvar IN_MOVED_FROM: File was moved from X. @type IN_MOVED_FROM: int @cvar IN_MOVED_TO: File was moved to Y. @type IN_MOVED_TO: int @cvar IN_CREATE: Subfile was created. @type IN_CREATE: int @cvar IN_DELETE: Subfile was deleted. @type IN_DELETE: int @cvar IN_DELETE_SELF: Self (watched item itself) was deleted. @type IN_DELETE_SELF: int @cvar IN_MOVE_SELF: Self (watched item itself) was moved. @type IN_MOVE_SELF: int @cvar IN_UNMOUNT: Backing fs was unmounted. @type IN_UNMOUNT: int @cvar IN_Q_OVERFLOW: Event queued overflowed. @type IN_Q_OVERFLOW: int @cvar IN_IGNORED: File was ignored. @type IN_IGNORED: int @cvar IN_ONLYDIR: only watch the path if it is a directory (new in kernel 2.6.15). @type IN_ONLYDIR: int @cvar IN_DONT_FOLLOW: don't follow a symlink (new in kernel 2.6.15). IN_ONLYDIR we can make sure that we don't watch the target of symlinks. @type IN_DONT_FOLLOW: int @cvar IN_MASK_ADD: add to the mask of an already existing watch (new in kernel 2.6.14). @type IN_MASK_ADD: int @cvar IN_ISDIR: Event occurred against dir. @type IN_ISDIR: int @cvar IN_ONESHOT: Only send event once. @type IN_ONESHOT: int @cvar ALL_EVENTS: Alias for considering all of the events. @type ALL_EVENTS: int """ FLAG_COLLECTIONS = { 'OP_FLAGS': { 'IN_ACCESS': 1, 'IN_MODIFY': 2, 'IN_ATTRIB': 4, 'IN_CLOSE_WRITE': 8, 'IN_CLOSE_NOWRITE': 16, 'IN_OPEN': 32, 'IN_MOVED_FROM': 64, 'IN_MOVED_TO': 128, 'IN_CREATE': 256, 'IN_DELETE': 512, 'IN_DELETE_SELF': 1024, 'IN_MOVE_SELF': 2048 }, 'EVENT_FLAGS': { 'IN_UNMOUNT': 8192, 'IN_Q_OVERFLOW': 16384, 'IN_IGNORED': 32768 }, 'SPECIAL_FLAGS': { 'IN_ONLYDIR': 16777216, 'IN_DONT_FOLLOW': 33554432, 'IN_MASK_ADD': 536870912, 'IN_ISDIR': 1073741824, 'IN_ONESHOT': 0x80000000L } } def maskname(mask): ''' Return the event name associated to mask. IN_ISDIR is appended when appropriate. Note: only one event is returned, because only one is raised once at a time. @param mask: mask. @type mask: int @return: event name. @rtype: str ''' ms = mask name = '%s' if mask & IN_ISDIR: ms = mask - IN_ISDIR name = '%s|IN_ISDIR' return name % EventsCodes.ALL_VALUES[ms] maskname = staticmethod(maskname) EventsCodes.ALL_FLAGS = { } EventsCodes.ALL_VALUES = { } for flagc, valc in EventsCodes.FLAG_COLLECTIONS.iteritems(): setattr(EventsCodes, flagc, valc) EventsCodes.ALL_FLAGS.update(valc) for name, val in valc.iteritems(): globals()[name] = val EventsCodes.ALL_VALUES[val] = name ALL_EVENTS = reduce((lambda x, y: x | y), EventsCodes.OP_FLAGS.itervalues()) EventsCodes.ALL_FLAGS['ALL_EVENTS'] = ALL_EVENTS EventsCodes.ALL_VALUES[ALL_EVENTS] = 'ALL_EVENTS' class _Event: ''' Event structure, represent events raised by the system. This is the base class and should be subclassed. ''' def __init__(self, dict_): ''' Attach attributes (contained in dict_) to self. ''' for tpl in dict_.iteritems(): setattr(self, *tpl) def __repr__(self): ''' @return: String representation. @rtype: str ''' s = '' for attr, value in sorted(self.__dict__.items(), key = (lambda x: x[0])): if attr.startswith('_'): continue if attr == 'mask': value = hex(getattr(self, attr)) elif isinstance(value, str) and not value: value = "''" s += ' %s%s%s' % (Color.FieldName(attr), Color.Punctuation('='), Color.FieldValue(value)) s = '%s%s%s %s' % (Color.Punctuation('<'), Color.ClassName(self.__class__.__name__), s, Color.Punctuation('>')) return s class _RawEvent(_Event): """ Raw event, it contains only the informations provided by the system. It doesn't infer anything. """ def __init__(self, wd, mask, cookie, name): ''' @param wd: Watch Descriptor. @type wd: int @param mask: Bitmask of events. @type mask: int @param cookie: Cookie. @type cookie: int @param name: Basename of the file or directory against which the event was raised, in case where the watched directory is the parent directory. None if the event was raised on the watched item itself. @type name: string or None ''' super(_RawEvent, self).__init__({ 'wd': wd, 'mask': mask, 'cookie': cookie, 'name': name.rstrip('\x00') }) log.debug(repr(self)) class Event(_Event): """ This class contains all the useful informations about the observed event. However, the incorporation of each field is not guaranteed and depends on the type of event. In effect, some fields are irrelevant for some kind of event (for example 'cookie' is meaningless for IN_CREATE whereas it is useful for IN_MOVE_TO). The possible fields are: - wd (int): Watch Descriptor. - mask (int): Mask. - maskname (str): Readable event name. - path (str): path of the file or directory being watched. - name (str): Basename of the file or directory against which the event was raised, in case where the watched directory is the parent directory. None if the event was raised on the watched item itself. This field is always provided even if the string is ''. - pathname (str): absolute path of: path + name - cookie (int): Cookie. - dir (bool): is the event raised against directory. """ def __init__(self, raw): ''' Concretely, this is the raw event plus inferred infos. ''' _Event.__init__(self, raw) self.maskname = EventsCodes.maskname(self.mask) try: if self.name: self.pathname = os.path.abspath(os.path.join(self.path, self.name)) else: self.pathname = os.path.abspath(self.path) except AttributeError: pass class ProcessEventError(PyinotifyError): ''' ProcessEventError Exception. Raised on ProcessEvent error. ''' def __init__(self, err): ''' @param err: Exception error description. @type err: string ''' PyinotifyError.__init__(self, err) class _ProcessEvent: ''' Abstract processing event class. ''' def __call__(self, event): ''' To behave like a functor the object must be callable. This method is a dispatch method. Lookup order: 1. process_MASKNAME method 2. process_FAMILY_NAME method 3. otherwise call process_default @param event: Event to be processed. @type event: Event object @return: By convention when used from the ProcessEvent class: - Returning False or None (default value) means keep on executing next chained functors (see chain.py example). - Returning True instead means do not execute next processing functions. @rtype: bool @raise ProcessEventError: Event object undispatchable, unknown event. ''' stripped_mask = event.mask - (event.mask & IN_ISDIR) maskname = EventsCodes.ALL_VALUES.get(stripped_mask) if maskname is None: raise ProcessEventError('Unknown mask 0x%08x' % stripped_mask) maskname is None meth = getattr(self, 'process_' + maskname, None) if meth is not None: return meth(event) meth = getattr(self, 'process_IN_' + maskname.split('_')[1], None) if meth is not None: return meth(event) return self.process_default(event) def __repr__(self): return '<%s>' % self.__class__.__name__ class _SysProcessEvent(_ProcessEvent): ''' There is three kind of processing according to each event: 1. special handling (deletion from internal container, bug, ...). 2. default treatment: which is applied to most of events. 4. IN_ISDIR is never sent alone, he is piggybacked with a standart event, he is not processed as the others events, instead, its value is captured and appropriately aggregated to dst event. ''' def __init__(self, wm, notifier): ''' @param wm: Watch Manager. @type wm: WatchManager instance @param notifier: notifier. @type notifier: Instance of Notifier. ''' self._watch_manager = wm self._notifier = notifier self._mv_cookie = { } self._mv = { } def cleanup(self): ''' Cleanup (delete) old (>1mn) records contained in self._mv_cookie and self._mv. ''' date_cur_ = datetime.now() for seq in [ self._mv_cookie, self._mv]: for k in seq.keys(): if date_cur_ - seq[k][1] > timedelta(minutes = 1): log.debug('cleanup: deleting entry %s' % seq[k][0]) del seq[k] continue def process_IN_CREATE(self, raw_event): """ If the event concerns a directory and the auto_add flag of the targetted watch is set to True, a new watch is added on this new directory, with the same attributes's values than those of this watch. """ if raw_event.mask & IN_ISDIR: watch_ = self._watch_manager._wmd.get(raw_event.wd) if watch_.auto_add: addw = self._watch_manager.add_watch newwd = addw(os.path.join(watch_.path, raw_event.name), watch_.mask, proc_fun = watch_.proc_fun, rec = False, auto_add = watch_.auto_add) base = os.path.join(watch_.path, raw_event.name) if newwd[base] > 0: for name in os.listdir(base): inner = os.path.join(base, name) if os.path.isdir(inner) and self._watch_manager.get_wd(inner) is None: rawevent = _RawEvent(newwd[base], IN_CREATE | IN_ISDIR, 0, name) self._notifier._eventq.append(rawevent) continue return self.process_default(raw_event) def process_IN_MOVED_FROM(self, raw_event): ''' Map the cookie with the source path (+ date for cleaning). ''' watch_ = self._watch_manager._wmd.get(raw_event.wd) path_ = watch_.path src_path = os.path.normpath(os.path.join(path_, raw_event.name)) self._mv_cookie[raw_event.cookie] = (src_path, datetime.now()) return self.process_default(raw_event, { 'cookie': raw_event.cookie }) def process_IN_MOVED_TO(self, raw_event): ''' Map the source path with the destination path (+ date for cleaning). ''' watch_ = self._watch_manager._wmd.get(raw_event.wd) path_ = watch_.path dst_path = os.path.normpath(os.path.join(path_, raw_event.name)) mv_ = self._mv_cookie.get(raw_event.cookie) if mv_: self._mv[mv_[0]] = (dst_path, datetime.now()) return self.process_default(raw_event, { 'cookie': raw_event.cookie }) def process_IN_MOVE_SELF(self, raw_event): """ STATUS: the following bug has been fixed in the recent kernels (fixme: which version ?). Now it raises IN_DELETE_SELF instead. Old kernels are bugged, this event is raised when the watched item was moved, so we must update its path, but under some circumstances it can be impossible: if its parent directory and its destination directory aren't watched. The kernel (see include/linux/fsnotify.h) doesn't bring us enough informations like the destination path of moved items. """ watch_ = self._watch_manager._wmd.get(raw_event.wd) src_path = watch_.path mv_ = self._mv.get(src_path) if mv_: watch_.path = mv_[0] else: log.error('The path %s of this watch %s must not be trusted anymore' % (watch_.path, watch_)) if not watch_.path.endswith('-wrong-path'): watch_.path += '-wrong-path' return self.process_default(raw_event) def process_IN_Q_OVERFLOW(self, raw_event): ''' Only signal overflow, most of the common flags are irrelevant for this event (path, wd, name). ''' return Event({ 'mask': raw_event.mask }) def process_IN_IGNORED(self, raw_event): ''' The watch descriptor raised by this event is now ignored (forever), it can be safely deleted from watch manager dictionary. After this event we can be sure that neither the event queue neither the system will raise an event associated to this wd. ''' event_ = self.process_default(raw_event) try: del self._watch_manager._wmd[raw_event.wd] except KeyError: err = None log.error(err) return event_ def process_default(self, raw_event, to_append = { }): ''' Common handling for the following events: IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE, IN_CLOSE_NOWRITE, IN_OPEN, IN_DELETE, IN_DELETE_SELF, IN_UNMOUNT. ''' ret = None watch_ = self._watch_manager._wmd.get(raw_event.wd) if raw_event.mask & (IN_DELETE_SELF | IN_MOVE_SELF): dir_ = watch_.dir else: dir_ = bool(raw_event.mask & IN_ISDIR) dict_ = { 'wd': raw_event.wd, 'mask': raw_event.mask, 'path': watch_.path, 'name': raw_event.name, 'dir': dir_ } dict_.update(to_append) return Event(dict_) class ProcessEvent(_ProcessEvent): """ Process events objects, can be specialized via subclassing, thus its behavior can be overriden: Note: you should not override __init__ in your subclass instead define a my_init() method, this method will be called from the constructor of this class with optional parameters. 1. Provide methods, e.g. process_IN_DELETE for processing a given kind of event (eg. IN_DELETE in this case). 2. Or/and provide methods for processing events by 'family', e.g. process_IN_CLOSE method will process both IN_CLOSE_WRITE and IN_CLOSE_NOWRITE events (if process_IN_CLOSE_WRITE and process_IN_CLOSE_NOWRITE aren't defined). 3. Or/and override process_default for processing the remaining kind of events. """ pevent = None def __init__(self, pevent = None, **kargs): ''' Enable chaining of ProcessEvent instances. @param pevent: optional callable object, will be called on event processing (before self). @type pevent: callable @param kargs: optional arguments delagated to template method my_init @type kargs: dict ''' self.pevent = pevent self.my_init(**kargs) def my_init(self, **kargs): """ Override this method when subclassing if you want to achieve custom initialization of your subclass' instance. You MUST pass keyword arguments. This method does nothing by default. @param kargs: optional arguments delagated to template method my_init @type kargs: dict """ pass def __call__(self, event): stop_chaining = False if self.pevent is not None: stop_chaining = self.pevent(event) if not stop_chaining: return _ProcessEvent.__call__(self, event) def nested_pevent(self): return self.pevent def process_default(self, event): ''' Default default processing event method. Print event on standart output. @param event: Event to be processed. @type event: Event instance ''' print repr(event) class ChainIfTrue(ProcessEvent): ''' Makes conditional chaining depending on the result of the nested processing instance. ''' def my_init(self, func): self._func = func def process_default(self, event): return not self._func(event) class Stats(ProcessEvent): def my_init(self): self._start_time = time.time() self._stats = { } self._stats_lock = threading.Lock() def process_default(self, event): self._stats_lock.acquire() try: events = event.maskname.split('|') for event_name in events: count = self._stats.get(event_name, 0) self._stats[event_name] = count + 1 finally: self._stats_lock.release() def _stats_copy(self): self._stats_lock.acquire() try: return self._stats.copy() finally: self._stats_lock.release() def __repr__(self): stats = self._stats_copy() t = int(time.time() - self._start_time) if t < 60: ts = str(t) + 'sec' elif t <= t: pass elif t < 3600: ts = '%dmn%dsec' % (t / 60, t % 60) elif t <= t: pass elif t < 86400: ts = '%dh%dmn' % (t / 3600, (t % 3600) / 60) elif t >= 86400: ts = '%dd%dh' % (t / 86400, (t % 86400) / 3600) stats['ElapsedTime'] = ts l = [] for ev, value in sorted(stats.items(), key = (lambda x: x[0])): l.append(' %s=%s' % (Color.FieldName(ev), Color.FieldValue(value))) s = '<%s%s >' % (Color.ClassName(self.__class__.__name__), ''.join(l)) return s def dump(self, filename): fo = file(filename, 'wb') try: fo.write(str(self)) finally: fo.close() def __str__(self, scale = 45): stats = self._stats_copy() if not stats: return '' m = max(stats.values()) if not int(round(float(m) / scale)): pass unity = 1 fmt = '%%-26s%%-%ds%%s' % (len(Color.FieldValue('@' * scale)) + 1) def func(x): return fmt % (Color.FieldName(x[0]), Color.FieldValue('@' * (x[1] / unity)), Color.Simple('%d' % x[1], 'yellow')) s = '\n'.join(map(func, sorted(stats.items(), key = (lambda x: x[0])))) return s class NotifierError(PyinotifyError): ''' Notifier Exception. Raised on Notifier error. ''' def __init__(self, err): """ @param err: Exception string's description. @type err: string """ PyinotifyError.__init__(self, err) class Notifier: ''' Read notifications, process events. ''' def __init__(self, watch_manager, default_proc_fun = ProcessEvent(), read_freq = 0, treshold = 0, timeout = None): ''' Initialization. read_freq, treshold and timeout parameters are used when looping. @param watch_manager: Watch Manager. @type watch_manager: WatchManager instance @param default_proc_fun: Default processing method. @type default_proc_fun: instance of ProcessEvent @param read_freq: if read_freq == 0, events are read asap, if read_freq is > 0, this thread sleeps max(0, read_freq - timeout) seconds. But if timeout is None it can be different because poll is blocking waiting for something to read. @type read_freq: int @param treshold: File descriptor will be read only if its size to read is >= treshold. If != 0, you likely want to use it in combination with read_freq because without that you keep looping without really reading anything and that until the amount to read is >= treshold. At least with read_freq you may sleep. @type treshold: int @param timeout: http://docs.python.org/lib/poll-objects.html#poll-objects @type timeout: int ''' self._watch_manager = watch_manager self._fd = self._watch_manager._fd self._pollobj = select.poll() self._pollobj.register(self._fd, select.POLLIN) self._pipe = (-1, -1) self._eventq = deque() self._sys_proc_fun = _SysProcessEvent(self._watch_manager, self) self._default_proc_fun = default_proc_fun self._read_freq = read_freq self._treshold = treshold self._timeout = timeout def proc_fun(self): return self._default_proc_fun def check_events(self): ''' Check for new events available to read, blocks up to timeout milliseconds. @return: New events to read. @rtype: bool ''' while True: try: ret = self._pollobj.poll(self._timeout) except select.error: err = None if err[0] == errno.EINTR: continue else: raise err[0] == errno.EINTR break if not ret or self._pipe[0] == ret[0][0]: return False return ret[0][1] & select.POLLIN def read_events(self): ''' Read events from device, build _RawEvents, and enqueue them. ''' buf_ = array.array('i', [ 0]) if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1: return None queue_size = buf_[0] if queue_size < self._treshold: log.debug('(fd: %d) %d bytes available to read but treshold is fixed to %d bytes' % (self._fd, queue_size, self._treshold)) return None try: r = os.read(self._fd, queue_size) except Exception: queue_size < self._treshold msg = queue_size < self._treshold fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1 raise NotifierError(msg) except: queue_size < self._treshold log.debug('event queue size: %d' % queue_size) rsum = 0 while rsum < queue_size: s_size = 16 s_ = struct.unpack('iIII', r[rsum:rsum + s_size]) fname_len = s_[3] s_ = s_[:-1] s_ += struct.unpack('%ds' % fname_len, r[rsum + s_size:rsum + s_size + fname_len]) self._eventq.append(_RawEvent(*s_)) rsum += s_size + fname_len continue queue_size < self._treshold def process_events(self): ''' Routine for processing events from queue by calling their associated proccessing function (instance of ProcessEvent). It also do internal processings, to keep the system updated. ''' while self._eventq: raw_event = self._eventq.popleft() watch_ = self._watch_manager._wmd.get(raw_event.wd) revent = self._sys_proc_fun(raw_event) if watch_ and watch_.proc_fun: watch_.proc_fun(revent) continue self._default_proc_fun(revent) self._sys_proc_fun.cleanup() def __daemonize(self, pid_file = None, force_kill = False, stdin = os.devnull, stdout = os.devnull, stderr = os.devnull): ''' pid_file: file to which pid will be written. force_kill: if True kill the process associated to pid_file. stdin, stdout, stderr: files associated to common streams. ''' if pid_file is None: dirname = '/var/run/' if not sys.argv[0]: pass basename = 'pyinotify' pid_file = os.path.join(dirname, basename + '.pid') def fork_daemon(): pid = os.fork() if pid == 0: os.setsid() pid = os.fork() if pid == 0: os.chdir('/') os.umask(0) else: os._exit(0) else: os._exit(0) fd_inp = os.open(stdin, os.O_RDONLY) os.dup2(fd_inp, 0) fd_out = os.open(stdout, os.O_WRONLY | os.O_CREAT) os.dup2(fd_out, 1) fd_err = os.open(stderr, os.O_WRONLY | os.O_CREAT) os.dup2(fd_err, 2) fork_daemon() fo = file(pid_file, 'wb') try: fo.write(str(os.getpid()) + '\n') finally: fo.close() (atexit.register,)((lambda : os.unlink(pid_file))) def _sleep(self, ref_time): if self._read_freq > 0: cur_time = time.time() sleep_amount = self._read_freq - cur_time - ref_time if sleep_amount > 0: log.debug('Now sleeping %d seconds' % sleep_amount) time.sleep(sleep_amount) def loop(self, callback = None, daemonize = False, **args): ''' Events are read only once time every min(read_freq, timeout) seconds at best and only if the size to read is >= treshold. @param callback: Functor called after each event processing. Expects to receive notifier object (self) as first parameter. @type callback: callable @param daemonize: This thread is daemonized if set to True. @type daemonize: boolean ''' if daemonize: self._Notifier__daemonize(**args) while None: try: self.process_events() if callback is not None: callback(self) ref_time = time.time() if self.check_events(): self._sleep(ref_time) self.read_events() continue except KeyboardInterrupt: log.debug('Pyinotify stops monitoring.') self.stop() break continue return None def stop(self): """ Close the inotify's instance (close its file descriptor). It destroys all existing watches, pending events,... """ self._pollobj.unregister(self._fd) os.close(self._fd) class ThreadedNotifier(threading.Thread, Notifier): ''' This notifier inherits from threading.Thread for instantiating a separate thread, and also inherits from Notifier, because it is a threaded notifier. Note that everything possible with this class is also possible through Notifier. Moreover Notifier is _better_ under many aspects: not threaded, can be easily daemonized. ''' def __init__(self, watch_manager, default_proc_fun = ProcessEvent(), read_freq = 0, treshold = 0, timeout = None): ''' Initialization, initialize base classes. read_freq, treshold and timeout parameters are used when looping. @param watch_manager: Watch Manager. @type watch_manager: WatchManager instance @param default_proc_fun: Default processing method. @type default_proc_fun: instance of ProcessEvent @param read_freq: if read_freq == 0, events are read asap, if read_freq is > 0, this thread sleeps max(0, read_freq - timeout) seconds. @type read_freq: int @param treshold: File descriptor will be read only if its size to read is >= treshold. If != 0, you likely want to use it in combination with read_freq because without that you keep looping without really reading anything and that until the amount to read is >= treshold. At least with read_freq you may sleep. @type treshold: int @param timeout: see http://docs.python.org/lib/poll-objects.html#poll-objects Read the corresponding comment in the source code before changing it. @type timeout: int ''' threading.Thread.__init__(self) self._stop_event = threading.Event() Notifier.__init__(self, watch_manager, default_proc_fun, read_freq, treshold, timeout) self._pipe = os.pipe() self._pollobj.register(self._pipe[0], select.POLLIN) def stop(self): """ Stop the notifier's loop. Stop notification. Join the thread. """ self._stop_event.set() os.write(self._pipe[1], 'stop') threading.Thread.join(self) Notifier.stop(self) self._pollobj.unregister(self._pipe[0]) os.close(self._pipe[0]) os.close(self._pipe[1]) def loop(self): """ Thread's main loop. Don't meant to be called by user directly. Call start() instead. Events are read only once time every min(read_freq, timeout) seconds at best and only if the size of events to read is >= treshold. """ while not self._stop_event.isSet(): self.process_events() ref_time = time.time() if self.check_events(): self._sleep(ref_time) self.read_events() continue def run(self): """ Start the thread's loop: read and process events until the method stop() is called. Never call this method directly, instead call the start() method inherited from threading.Thread, which then will call run(). """ self.loop() class Watch: ''' Represent a watch, i.e. a file or directory being watched. ''' def __init__(self, **keys): ''' Initializations. @param wd: Watch descriptor. @type wd: int @param path: Path of the file or directory being watched. @type path: str @param mask: Mask. @type mask: int @param proc_fun: Processing callable object. @type proc_fun: @param auto_add: Automatically add watches on new directories. @type auto_add: bool ''' for k, v in keys.iteritems(): setattr(self, k, v) self.dir = os.path.isdir(self.path) def __repr__(self): ''' @return: String representation. @rtype: str ''' s = [](_[1]) s = '%s%s %s %s' % (Color.Punctuation('<'), Color.ClassName(self.__class__.__name__), s, Color.Punctuation('>')) return s class ExcludeFilter: ''' ExcludeFilter is an exclusion filter. ''' def __init__(self, arg_lst): """ @param arg_lst: is either a list or dict of patterns: [pattern1, ..., patternn] {'filename1': (list1, listn), ...} where list1 is a list of patterns @type arg_lst: list or dict """ if isinstance(arg_lst, dict): lst = self._load_patterns(arg_lst) elif isinstance(arg_lst, list): lst = arg_lst else: raise TypeError self._lregex = isinstance(arg_lst, dict) for regex in lst: self._lregex.append(re.compile(regex, re.UNICODE)) def _load_patterns(self, dct): lst = [] for path, varnames in dct.iteritems(): loc = { } execfile(path, { }, loc) for varname in varnames: lst.extend(loc.get(varname, [])) return lst def _match(self, regex, path): return regex.match(path) is not None def __call__(self, path): ''' @param path: path to match against regexps. @type path: str @return: return True is path has been matched and should be excluded, False otherwise. @rtype: bool ''' for regex in self._lregex: if self._match(regex, path): return True return False class WatchManagerError(Exception): ''' WatchManager Exception. Raised on error encountered on watches operations. ''' def __init__(self, msg, wmd): """ @param msg: Exception string's description. @type msg: string @param wmd: Results of previous operations made by the same function on previous wd or paths. It also contains the item which raised this exception. @type wmd: dict """ self.wmd = wmd Exception.__init__(self, msg) class WatchManager: ''' Provide operations for watching files and directories. Integrated dictionary is used to reference watched items. ''' def __init__(self, exclude_filter = (lambda path: False)): ''' Initialization: init inotify, init watch manager dictionary. Raise OSError if initialization fails. @param exclude_filter: boolean function, returns True if current path must be excluded from being watched. Convenient for providing a common exclusion filter for every call to add_watch. @type exclude_filter: bool ''' self._exclude_filter = exclude_filter self._wmd = { } self._fd = LIBC.inotify_init() if self._fd < 0: raise OSError() self._fd < 0 def __add_watch(self, path, mask, proc_fun, auto_add): ''' Add a watch on path, build a Watch object and insert it in the watch manager dictionary. Return the wd value. ''' wd_ = LIBC.inotify_add_watch(self._fd, ctypes.create_string_buffer(path), mask) if wd_ < 0: return wd_ watch_ = Watch(wd = wd_, path = os.path.normpath(path), mask = mask, proc_fun = proc_fun, auto_add = auto_add) self._wmd[wd_] = watch_ log.debug('New %s' % watch_) return wd_ def __glob(self, path, do_glob): if do_glob: return iglob(path) return [ path] def add_watch(self, path, mask, proc_fun = None, rec = False, auto_add = False, do_glob = False, quiet = True, exclude_filter = None): """ Add watch(s) on given path(s) with the specified mask and optionnally with a processing function and recursive flag. @param path: Path to watch, the path can either be a file or a directory. Also accepts a sequence (list) of paths. @type path: string or list of string @param mask: Bitmask of events. @type mask: int @param proc_fun: Processing object. @type proc_fun: function or ProcessEvent instance or instance of one of its subclasses or callable object. @param rec: Recursively add watches from path on all its subdirectories, set to False by default (doesn't follows symlinks). @type rec: bool @param auto_add: Automatically add watches on newly created directories in the watch's path. @type auto_add: bool @param do_glob: Do globbing on pathname. @type do_glob: bool @param quiet: if True raise an WatchManagerError exception on error. See example not_quiet.py @type quiet: bool @param exclude_filter: boolean function, returns True if current path must be excluded from being watched. Has precedence on exclude_filter defined into __init__. @type exclude_filter: bool @return: dict of paths associated to watch descriptors. A wd value is positive if the watch has been sucessfully added, otherwise the value is negative. If the path is invalid it will be not included into this dict. @rtype: dict of {str: int} """ ret_ = { } if exclude_filter is None: exclude_filter = self._exclude_filter for npath in self._WatchManager__format_param(path): for apath in self._WatchManager__glob(npath, do_glob): for rpath in self._WatchManager__walk_rec(apath, rec): if not exclude_filter(rpath): wd = ret_[rpath] = self._WatchManager__add_watch(rpath, mask, proc_fun, auto_add) if wd < 0: err = 'add_watch: cannot watch %s (WD=%d)' err = err % (rpath, wd) if quiet: log.error(err) else: raise WatchManagerError(err, ret_) quiet continue ret_[rpath] = -2 return ret_ def __get_sub_rec(self, lpath): """ Get every wd from self._wmd if its path is under the path of one (at least) of those in lpath. Doesn't follow symlinks. @param lpath: list of watch descriptor @type lpath: list of int @return: list of watch descriptor @rtype: list of int """ for d in lpath: root = self.get_path(d) if root: yield d if not os.path.isdir(root): continue root = os.path.normpath(root) lend = len(root) for iwd in self._wmd.items(): cur = iwd[1].path pref = os.path.commonprefix([ root, cur]) if (root == os.sep or len(pref) == lend) and len(cur) > lend and cur[lend] == os.sep: yield iwd[1].wd continue def update_watch(self, wd, mask = None, proc_fun = None, rec = False, auto_add = False, quiet = True): """ Update existing watch(s). Both the mask and the processing object can be modified. @param wd: Watch Descriptor to update. Also accepts a list of watch descriptors. @type wd: int or list of int @param mask: Optional new bitmask of events. @type mask: int @param proc_fun: Optional new processing function. @type proc_fun: function or ProcessEvent instance or instance of one of its subclasses or callable object. @param rec: Recursively update watches on every already watched subdirectories and subfiles. @type rec: bool @param auto_add: Automatically add watches on newly created directories in the watch's path. @type auto_add: bool @param quiet: if True raise an WatchManagerError exception on error. See example not_quiet.py @type quiet: bool @return: dict of watch descriptors associated to booleans values. True if the corresponding wd has been successfully updated, False otherwise. @rtype: dict of int: bool """ lwd = self._WatchManager__format_param(wd) if rec: lwd = self._WatchManager__get_sub_rec(lwd) ret_ = { } for awd in lwd: apath = self.get_path(awd) if not apath or awd < 0: err = 'update_watch: invalid WD=%d' % awd if quiet: log.error(err) continue raise WatchManagerError(err, ret_) awd < 0 if mask: addw = LIBC.inotify_add_watch wd_ = addw(self._fd, ctypes.create_string_buffer(apath), mask) if wd_ < 0: ret_[awd] = False err = 'update_watch: cannot update WD=%d (%s)' % (wd_, apath) if quiet: log.error(err) continue raise WatchManagerError(err, ret_) wd_ < 0 if not awd == wd_: raise AssertionError if proc_fun or auto_add: watch_ = self._wmd[awd] if proc_fun: watch_.proc_fun = proc_fun if auto_add: watch_.proc_fun = auto_add ret_[awd] = True log.debug('Updated watch - %s' % self._wmd[awd]) return ret_ def __format_param(self, param): ''' @param param: Parameter. @type param: string or int @return: wrap param. @rtype: list of type(param) ''' if isinstance(param, list): for p_ in param: yield p_ else: yield param def get_wd(self, path): ''' Returns the watch descriptor associated to path. This method has an prohibitive cost, always prefer to keep the WD. If path is unknown None is returned. @param path: path. @type path: str @return: WD or None. @rtype: int or None ''' path = os.path.normpath(path) for iwd in self._wmd.iteritems(): if iwd[1].path == path: return iwd[0] log.debug('get_wd: unknown path %s' % path) def get_path(self, wd): ''' Returns the path associated to WD, if WD is unknown None is returned. @param wd: watch descriptor. @type wd: int @return: path or None. @rtype: string or None ''' watch_ = self._wmd.get(wd) if watch_: return watch_.path log.debug('get_path: unknown WD %d' % wd) def __walk_rec(self, top, rec): """ Yields each subdirectories of top, doesn't follow symlinks. If rec is false, only yield top. @param top: root directory. @type top: string @param rec: recursive flag. @type rec: bool @return: path of one subdirectory. @rtype: string """ if not rec and os.path.islink(top) or not os.path.isdir(top): yield top else: for root, dirs, files in os.walk(top): yield root def rm_watch(self, wd, rec = False, quiet = True): ''' Removes watch(s). @param wd: Watch Descriptor of the file or directory to unwatch. Also accepts a list of WDs. @type wd: int or list of int. @param rec: Recursively removes watches on every already watched subdirectories and subfiles. @type rec: bool @param quiet: if True raise an WatchManagerError exception on error. See example not_quiet.py @type quiet: bool @return: dict of watch descriptors associated to booleans values. True if the corresponding wd has been successfully removed, False otherwise. @rtype: dict of int: bool ''' lwd = self._WatchManager__format_param(wd) if rec: lwd = self._WatchManager__get_sub_rec(lwd) ret_ = { } for awd in lwd: wd_ = LIBC.inotify_rm_watch(self._fd, awd) if wd_ < 0: ret_[awd] = False err = 'rm_watch: cannot remove WD=%d' % awd if quiet: log.error(err) continue raise WatchManagerError(err, ret_) wd_ < 0 ret_[awd] = True log.debug('watch WD=%d (%s) removed' % (awd, self.get_path(awd))) return ret_ def watch_transient_file(self, filename, mask, proc_class): """ Watch a transient file, which will be created and deleted frequently over time (e.g. pid file). @attention: Under the call to this function it will be impossible to correctly watch the events triggered into the same base directory than the directory where is located this watched transient file. For instance it would actually be wrong to make these two successive calls: wm.watch_transient_file('/var/run/foo.pid', ...) and wm.add_watch('/var/run/', ...) @param filename: Filename. @type filename: string @param mask: Bitmask of events, should contain IN_CREATE and IN_DELETE. @type mask: int @param proc_class: ProcessEvent (or of one of its subclass), beware of accepting a ProcessEvent's instance as argument into __init__, see transient_file.py example for more details. @type proc_class: ProcessEvent's instance or of one of its subclasses. @return: See add_watch(). @rtype: See add_watch(). """ dirname = os.path.dirname(filename) if dirname == '': return { } basename = os.path.basename(filename) mask |= IN_CREATE | IN_DELETE def cmp_name(event): return basename == event.name return self.add_watch(dirname, mask, proc_fun = proc_class(ChainIfTrue(func = cmp_name)), rec = False, auto_add = False, do_glob = False) class Color: normal = '\x1b[0m' black = '\x1b[30m' red = '\x1b[31m' green = '\x1b[32m' yellow = '\x1b[33m' blue = '\x1b[34m' purple = '\x1b[35m' cyan = '\x1b[36m' bold = '\x1b[1m' uline = '\x1b[4m' blink = '\x1b[5m' invert = '\x1b[7m' def Punctuation(s): return Color.normal + s + Color.normal Punctuation = staticmethod(Punctuation) def FieldValue(s): if not isinstance(s, str): s = str(s) return Color.purple + s + Color.normal FieldValue = staticmethod(FieldValue) def FieldName(s): return Color.blue + s + Color.normal FieldName = staticmethod(FieldName) def ClassName(s): return Color.red + Color.bold + s + Color.normal ClassName = staticmethod(ClassName) def Simple(s, color): if not isinstance(s, str): s = str(s) try: color_attr = getattr(Color, color) except AttributeError: return s return color_attr + s + Color.normal Simple = staticmethod(Simple) def command_line(): OptionParser = OptionParser import optparse usage = 'usage: %prog [options] [path1] [path2] [pathn]' parser = OptionParser(usage = usage) parser.add_option('-v', '--verbose', action = 'store_true', dest = 'verbose', help = 'Verbose mode') parser.add_option('-r', '--recursive', action = 'store_true', dest = 'recursive', help = 'Add watches recursively on paths') parser.add_option('-a', '--auto_add', action = 'store_true', dest = 'auto_add', help = 'Automatically add watches on new directories') parser.add_option('-e', '--events-list', metavar = 'EVENT[,...]', dest = 'events_list', help = 'A comma-separated list of events to watch for - see the documentation for valid options (defaults to everything)') parser.add_option('-s', '--stats', action = 'store_true', dest = 'stats', help = 'Display statistics') (options, args) = parser.parse_args() if options.verbose: log.setLevel(10) if len(args) < 1: path = '/tmp' else: path = args wm = WatchManager() if options.stats: notifier = Notifier(wm, default_proc_fun = Stats(), read_freq = 5) else: notifier = Notifier(wm) mask = 0 if options.events_list: events_list = options.events_list.split(',') for ev in events_list: evcode = EventsCodes.ALL_FLAGS.get(ev, 0) if evcode: mask |= evcode continue parser.error("The event '%s' specified with option -e is not valid" % ev) else: mask = ALL_EVENTS cb_fun = None if options.stats: def cb(s): print '%s\n%s\n' % (repr(s.proc_fun()), s.proc_fun()) cb_fun = cb log.debug('Start monitoring %s, (press c^c to halt pyinotify)' % path) wm.add_watch(path, mask, rec = options.recursive, auto_add = options.auto_add) notifier.loop(callback = cb_fun) if __name__ == '__main__': command_line()